{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Visualizing datasets larger than memory using Datashader with Dask\n",
    "\n",
    "## Datashading a 2.7-billion-point Open Street Map database\n",
    "\n",
    "Most [datashader](https://github.com/bokeh/datashader) examples use \"medium-sized\" datasets, because they need to be small enough to be distributed over the internet without racking up huge bandwidth charges for the project maintainers. Even though these datasets can be relatively large (such as the [1-billion point Open Street Map example](https://anaconda.org/jbednar/osm-1billion)), they still fit into memory on a 16GB laptop.\n",
    "\n",
    "Because Datashader supports [Dask](http://dask.pydata.org) dataframes, it also works well with truly large datasets, much bigger than will fit in any one machine's physical memory. On a single machine, Dask will automatically and efficiently page in the data as needed, and you can also easily distribute the data and computation across multiple machines. Here we illustrate how to work \"out of core\" on a single machine using a 22GB OSM dataset containing 2.7 billion points.\n",
    "\n",
    "The data is taken from Open Street Map's (OSM) [bulk GPS point data](https://blog.openstreetmap.org/2012/04/01/bulk-gps-point-data/), and is unfortunately too large to distribute with Datashader (7GB compressed). The data was collected by OSM contributors' GPS devices, and was provided as a CSV file of `latitude,longitude` coordinates. The data was downloaded from their website, extracted, converted to use positions in Web Mercator format using `datashader.utils.lnglat_to_meters()`, and then stored in a [parquet](https://github.com/dask/fastparquet) file for [faster disk access](https://github.com/bokeh/datashader/issues/129#issuecomment-300515690). To run this notebook, you would need to do the same process yourself to obtain `osm.snappy.parq`.   Once you have it, you can follow the steps below to load and plot the data."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask.dataframe as dd\n",
    "import dask.diagnostics as diag\n",
    "import datashader as ds\n",
    "import datashader.transfer_functions as tf"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = dd.io.parquet.read_parquet('../data/osm.snappy.parq')\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Aggregation\n",
    "\n",
    "First, we create a canvas to provide pixel-shaped bins in which points can be aggregated, and then aggregate the data to produce a fixed-size aggregate array.  This process may take up to a minute, so we provide a progress bar using dask:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "bound = 20026376.39\n",
    "bounds = dict(x_range = (-bound, bound), y_range = (int(-bound*0.4), int(bound*0.6)))\n",
    "plot_width = 1000\n",
    "plot_height = int(plot_width*0.5)\n",
    "\n",
    "cvs = ds.Canvas(plot_width=plot_width, plot_height=plot_height, **bounds)\n",
    "\n",
    "with diag.ProgressBar(), diag.Profiler() as prof, diag.ResourceProfiler(0.5) as rprof:\n",
    "    agg = cvs.points(df, 'x', 'y', ds.count())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can now visualize this data very quickly, ignoring low-count noise as described in the [1-billion point OSM version](https://anaconda.org/jbednar/osm-1billion):"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.shade(agg.where(agg > 15), cmap=[\"lightblue\", \"darkblue\"])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Performance Profile\n",
    "\n",
    "Dask offers some tools to visualize how memory and processing power are being used during these calculations:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from bokeh.io import output_notebook\n",
    "from bokeh.resources import CDN\n",
    "output_notebook(CDN, hide_banner=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "diag.visualize([prof, rprof])\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Performance notes:\n",
    "- On a 16GB machine, most of the time is spent reading the data from disk (the purple rectangles)\n",
    "- Reading time includes not just disk I/O, but decompressing chunks of data\n",
    "- The disk reads don't release the [Global Interpreter Lock](https://wiki.python.org/moin/GlobalInterpreterLock) (GIL), and so CPU usage (see second chart above) drops to only one core during those periods.\n",
    "- During the aggregation steps (the green rectangles), CPU usage on this machine with 8 hyperthreaded cores (4 full cores) spikes to nearly 800%, because the aggregation function is implemented in parallel. \n",
    "- The data takes up 22 GB uncompressed, but only a peak of around 6 GB of physical memory is ever used because the data is paged in as needed."
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
